package com.webchat.act.service;

import com.google.common.collect.Lists;
import com.webchat.act.repository.dao.IResourceBehaviorDAO;
import com.webchat.act.repository.entity.ResourceBehaviorEntity;
import com.webchat.common.constants.ResourceBehaviorConstants;
import com.webchat.common.enums.RedisKeyEnum;
import com.webchat.common.exception.BusinessException;
import com.webchat.common.service.RedisService;
import com.webchat.common.util.DateUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.DefaultTypedTuple;
import org.springframework.data.redis.core.ZSetOperations;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public abstract class AbstractResourceBehaviorService implements ResourceBehaviorInter {

    protected String behaviorType;
    protected String behaviorTypeName;
    public void setBehaviorType(String behaviorType) {
        this.behaviorType = behaviorType;
        this.behaviorTypeName = ResourceBehaviorConstants.getBehaviorTypeName(behaviorType);
    }

    @Autowired
    IResourceBehaviorDAO resourceBehaviorDAO;
    @Autowired
    RedisService redisService;
    @Autowired
    RedissonClient redissonClient;

    /**
     * 实际互动行为，由具体实现类来实现
     *
     * @param userId
     * @param resourceType
     * @param resourceId
     * @return
     */
    protected abstract boolean doBehavior(String userId, String resourceType, Long resourceId);

    /**
     * 实际取消互动行为，由具体实现类来实现
     *
     * @param userId
     * @param resourceType
     * @param resourceId
     * @return
     */
    protected boolean doCancelBehavior(String userId, String resourceType, Long resourceId) {

        throw new BusinessException("不支持的互动行为");
    }

    @Override
    public long behavior(String userId, String resourceType, Long resourceId) {

        Long behaviorCount = 0L;
        /**
         * 1. 数据持久化 DB
         */
        if (this.doBehavior(userId, resourceType, resourceId)) {

            /**
             * 2. 构建、刷新互动行为数据缓存
             */
            behaviorCount = this.refreshBehaviorCache(userId, resourceType, resourceId, false);
            /**
             * 3. 互动消息 TODO
             */
        }
        return behaviorCount;
    }

    @Override
    public long cancelBehavior(String userId, String resourceType, Long resourceId) {
        Long behaviorCount = 0L;
        /**
         * 1. 数据持久化 DB
         */
        if (this.doCancelBehavior(userId, resourceType, resourceId)) {
            /**
             * 2. 构建、刷新互动行为数据缓存
             */
            behaviorCount = this.refreshBehaviorCache(userId, resourceType, resourceId, true);
        }
        return behaviorCount;
    }


    /**
     * 批量查询资源的互动数量
     *
     * @param resourceType
     * @param resourceIds
     * @return
     */
    @Override
    public Map<Long, Long> countBehavior(String resourceType, List<Long> resourceIds) {
        Map<Long, Long> behaviorCountMap = new HashMap<>();
        String behaviorCacheKey = resourceBehaviorCountCacheHashKey(resourceType);
        List<String> resourceIdArr = resourceIds.stream().map(String::valueOf).collect(Collectors.toList());
        List<String> caches = redisService.hmget(behaviorCacheKey, resourceIdArr);
        for (int i = 0; i < resourceIds.size(); i++) {
            Long resourceId = resourceIds.get(i);
            String cache = caches.get(i);
            Long count;
            if (StringUtils.isNotBlank(cache)) {
                count = Long.valueOf(cache);
            } else {
                // TODO 推荐大家遍历结束后批量刷新
                count = this.initRefreshBehaviorCache(resourceType, resourceId);
            }
            behaviorCountMap.put(resourceId, count);
        }
        return behaviorCountMap;
    }

    @Override
    public Map<Long, Boolean> isBehavior(String userId, String resourceType, List<Long> resourceIds) {

        String cacheKey = this.userBehaviorResourceCacheKey(userId, resourceType);
        if(!redisService.exists(cacheKey)) {
            this.initUserBehaviorResourcesCache(userId, resourceType);
        }
        Set<String> resourceIdSet = resourceIds.stream().map(String::valueOf).collect(Collectors.toSet());
        Map<String, Boolean> behaviorStatusMap = redisService.zContains(cacheKey, resourceIdSet);
        return behaviorStatusMap.entrySet().stream()
                .collect(Collectors.toMap(
                        entry -> Long.parseLong(entry.getKey()),
                        Map.Entry::getValue,
                        (existing, replacement) -> existing
                ));
    }

    public boolean isBehavior(String userId, String resourceType, Long resourceId) {
        Map<Long, Boolean> isBehaviorMap = this.isBehavior(userId, resourceType, Lists.newArrayList(resourceId));
        return ObjectUtils.equals(isBehaviorMap.get(resourceId), true);
    }


    @Override
    public Map<String, Set<String>> listBehaviorUsers(String resourceType, List<Long> resourceIds, int size) {

        List<String> cacheKey = resourceIds.stream().map(rid ->
                                this.resourceBehaviorUserCacheKey(rid, resourceType)).toList();
        List<String> momentIds = resourceIds.stream().map(String::valueOf).toList();

        Map<String, Set<String>> momentBehaviorUserMap = redisService.zreverseRangeByPipelined(cacheKey, momentIds, 0, size);
        return momentBehaviorUserMap;
    }

    private Long refreshBehaviorCache(String userId, String resourceType, Long resourceId, boolean isCancel) {

        /**
         * 1. 刷新资源互动总量
         */
        Long behaviorCount = this.refreshBehaviorCountCache(resourceType, resourceId, isCancel);
        /**
         * 2. 刷新用户维度操作过的资源列表缓存
         */
        this.refreshUserBehaviorResourcesCache(userId, resourceType, resourceId, isCancel);
        /**
         * 3. 刷新资源维度操作过的用户列表缓存
         */
        this.refreshResourceBehaviorUsersCache(userId, resourceType, resourceId, isCancel);
        return behaviorCount;
    }


    /**
     * 刷新用户维度，互动资料列表缓存
     *
     * 这里为什么使用Zset，而不是Set？
     * 因为我们考虑到后续在”个人中心这一场景下，我们需要分页获取指定用户互动过的有序资源列表“
     *
     * @param userId
     * @param resourceType
     * @param resourceId
     * @param isCancel
     */
    private void refreshUserBehaviorResourcesCache(String userId, String resourceType, Long resourceId, boolean isCancel) {
        String cacheKey = this.userBehaviorResourceCacheKey(userId, resourceType);
        if (!redisService.exists(cacheKey)) {
            RLock lock = redissonClient.getLock(RedisKeyEnum.USER_BEHAVIOR_RESOURCE_CACHE_INIT_LOCK.getKey(userId));
            try {
                lock.lock();
                // 双重检查锁，避免并发下重复查库刷新redis
                if (!redisService.exists(cacheKey)) {
                    this.initUserBehaviorResourcesCache(userId, resourceType);
                }
            } finally {
                lock.unlock();
            }
            return;
        }
        if (isCancel) {
            // 取消互动
            redisService.zremove(cacheKey, String.valueOf(resourceId));
            return;
        }
        redisService.zadd(cacheKey, String.valueOf(resourceId), DateUtils.getCurrentTimeMillis(),
                RedisKeyEnum.USER_BEHAVIOR_RESOURCE_ZSET_CACHE.getExpireTime());
    }

    private void initUserBehaviorResourcesCache(String userId, String resourceType) {
        List<ResourceBehaviorEntity> entities =
                resourceBehaviorDAO.findAllByUserBehaviorResource(userId, behaviorType, resourceType);
        if (CollectionUtils.isNotEmpty(entities)) {
            Set<ZSetOperations.TypedTuple<String>> typedTupleSet = new HashSet<>();
            for (ResourceBehaviorEntity resourceBehavior : entities) {
                String value = String.valueOf(resourceBehavior.getResourceIndex());
                double score = resourceBehavior.getUpdateDate() != null ?
                               resourceBehavior.getUpdateDate().getTime() : resourceBehavior.getCreateDate().getTime();
                ZSetOperations.TypedTuple<String> tuple = new DefaultTypedTuple<>(value, score);
                typedTupleSet.add(tuple);
            }
            String cacheKey = this.userBehaviorResourceCacheKey(userId, resourceType);
            redisService.zadd(cacheKey, typedTupleSet, RedisKeyEnum.USER_BEHAVIOR_RESOURCE_ZSET_CACHE.getExpireTime());
        }
    }


    private void initResourceBehaviorUsersCache(Long resourceId, String resourceType) {
        List<ResourceBehaviorEntity> entities =
                resourceBehaviorDAO.findAllByResourceBehaviorUser(resourceId, behaviorType, resourceType);
        if (CollectionUtils.isNotEmpty(entities)) {
            Set<ZSetOperations.TypedTuple<String>> typedTupleSet = new HashSet<>();
            for (ResourceBehaviorEntity resourceBehavior : entities) {
                String value = String.valueOf(resourceBehavior.getUserId());
                double score = resourceBehavior.getUpdateDate() != null ?
                        resourceBehavior.getUpdateDate().getTime() : resourceBehavior.getCreateDate().getTime();
                ZSetOperations.TypedTuple<String> tuple = new DefaultTypedTuple<>(value, score);
                typedTupleSet.add(tuple);
            }
            String cacheKey = this.resourceBehaviorUserCacheKey(resourceId, resourceType);
            redisService.zadd(cacheKey, typedTupleSet, RedisKeyEnum.RESOURCE_BEHAVIOR_USER_ZSET_CACHE.getExpireTime());
        }
    }


    /**
     *
     * @param userId
     * @param resourceType
     * @param resourceId
     * @param isCancel
     */
    private void refreshResourceBehaviorUsersCache(String userId, String resourceType, Long resourceId, boolean isCancel) {
        String cacheKey = this.resourceBehaviorUserCacheKey(resourceId, resourceType);
        if (!redisService.exists(cacheKey)) {
            RLock lock = redissonClient.getLock(RedisKeyEnum.RESOURCE_BEHAVIOR_USER_CACHE_INIT_LOCK.getKey(
                    resourceType, String.valueOf(resourceId)));
            try {
                lock.lock();
                // 双重检查锁，避免并发下重复查库刷新redis
                if (!redisService.exists(cacheKey)) {
                    this.initResourceBehaviorUsersCache(resourceId, resourceType);
                }
            } finally {
                lock.unlock();
            }
            return;
        }
        if (isCancel) {
            // 取消互动
            redisService.zremove(cacheKey, userId);
            return;
        }
        redisService.zadd(cacheKey, userId, DateUtils.getCurrentTimeMillis(),
                RedisKeyEnum.USER_BEHAVIOR_RESOURCE_ZSET_CACHE.getExpireTime());
    }

    /**
     * 基于redis hash结构的原子+-特性实现计数器，记录资源互动最新量
     *
     * @param resourceType
     * @param resourceId
     * @param isCancel
     * @return
     */
    private Long refreshBehaviorCountCache(String resourceType, Long resourceId, boolean isCancel) {
        String cacheKey = this.resourceBehaviorCountCacheHashKey(resourceType);
        Long behaviorCount;
        if (isCancel) {
            behaviorCount = redisService.hdecrex(cacheKey, String.valueOf(resourceId));
        } else {
            behaviorCount = redisService.hincrex(cacheKey, String.valueOf(resourceId));
        }
        redisService.expire(cacheKey, RedisKeyEnum.RESOURCE_BEHAVIOR_COUNT_CACHE.getExpireTime());
        if (behaviorCount == 1 || behaviorCount == 0 || behaviorCount == -1) {
            // 说明：之前没有互动或者redis过期，都重新查库初始化一次redis缓存
            behaviorCount = this.initRefreshBehaviorCache(resourceType, resourceId);
        }
        return behaviorCount;
    }

    private Long initRefreshBehaviorCache(String resourceType, Long resourceId) {
        Long behaviorCount = resourceBehaviorDAO.countByBehaviorTypeAndResourceTypeAndResourceIndexAndStatusTrue(
                behaviorType, resourceType, resourceId);
        String countHashKey = this.resourceBehaviorCountCacheHashKey(resourceType);
        redisService.hset(countHashKey, String.valueOf(resourceId), String.valueOf(behaviorCount),
                RedisKeyEnum.RESOURCE_BEHAVIOR_COUNT_CACHE.getExpireTime());
        return behaviorCount;
    }

    private String resourceBehaviorCountCacheHashKey(String resourceType) {
        return RedisKeyEnum.RESOURCE_BEHAVIOR_COUNT_CACHE.getKey(this.behaviorType, resourceType);
    }

    private String userBehaviorResourceCacheKey(String userId, String resourceType) {
        return RedisKeyEnum.USER_BEHAVIOR_RESOURCE_ZSET_CACHE.getKey(behaviorType, resourceType, userId);
    }

    private String resourceBehaviorUserCacheKey(Long resourceId, String resourceType) {
        return RedisKeyEnum.USER_BEHAVIOR_RESOURCE_ZSET_CACHE.getKey(behaviorType, resourceType, String.valueOf(resourceId));
    }
}
